823e9cf358df7d6131b8db4ddd6cb9ade4e7baf5,src/org/jgroups/protocols/UNICAST.java,UNICAST,handleDataReceived,#Address#number#number#boolean#Message#,487
Before Change
return;
}
if(!added && !win.hasMessagesToRemove()) { // no ack if we didn't add the msg (e.g. duplicate)
return;
}
final AtomicBoolean processing=win.getProcessing();
if(!processing.compareAndSet(false, true)) {
return;
}
// Try to remove (from the AckReceiverWindow) as many messages as possible as pass them up
boolean released_processing=false;
int num_regular_msgs_removed=0;
// Prevents concurrent passing up of messages by different threads (http://jira.jboss.com/jira/browse/JGRP-198);
// this is all the more important once we have a threadless stack (http://jira.jboss.com/jira/browse/JGRP-181),
// where lots of threads can come up to this point concurrently, but only 1 is allowed to pass at a time
// We *can* deliver messages from *different* senders concurrently, e.g. reception of P1, Q1, P2, Q2 can result in
// delivery of P1, Q1, Q2, P2: FIFO (implemented by UNICAST) says messages need to be delivered only in the
// order in which they were sent by their senders
try {
while(true) {
Message m=win.remove(processing);
if(m == null) {
released_processing=true;
return;
}
// discard OOB msg as it has already been delivered (http://jira.jboss.com/jira/browse/JGRP-377)
if(m.isFlagSet(Message.OOB)) {
continue;
}
num_regular_msgs_removed++;
sendAckForMessage(m);
up_prot.up(new Event(Event.MSG, m));
}
}
finally {
After Change
}
}
if(send_request_for_first_seqno != null) {
sendRequestForFirstSeqno(send_request_for_first_seqno);
return;
}
byte result=win.add2(seqno, msg); // win is guaranteed to be non-null if we get here
boolean added=result > 0;
num_msgs_received++;
num_bytes_received+=msg.getLength();
if(added && !msg.isFlagSet(Message.OOB))
undelivered_msgs.incrementAndGet();
// Cannot be replaced with if(!added), see https://jira.jboss.org/jira/browse/JGRP-1043 comment 15/Sep/09 06:57 AM
// We *have* to do send the ACK, to cover the following scenario:
// - A sends #3 to B
// - B removes #3 and sends ACK(3) to A. B's next_to_remove is now 4
// - B's ACK(3) to A is dropped by the network
// - A keeps retransmitting #3 to B, until it gets an ACK(3)
// -B will never ACK #3 if the 2 lines below are commented ==> endless retransmission of A's #3 !
if(result == -1) { // only ack if seqno was < next_to_remove !
sendAck(msg.getSrc(), seqno);
}
// message is passed up if OOB. Later, when remove() is called, we discard it. This affects ordering !
// http://jira.jboss.com/jira/browse/JGRP-377
if(msg.isFlagSet(Message.OOB)) {
if(added)
up_prot.up(new Event(Event.MSG, msg));
Message oob_msg=win.removeOOBMessage();
if(!(undelivered_msgs.get() > 0 && win.hasMessagesToRemove())) {
if(oob_msg != null)
sendAckForMessage(oob_msg);
return;
}
}
final AtomicBoolean processing=win.getProcessing();
if(!processing.compareAndSet(false, true)) {
return;
}
// Try to remove (from the AckReceiverWindow) as many messages as possible as pass them up
boolean released_processing=false;
int num_regular_msgs_removed=0;
// Prevents concurrent passing up of messages by different threads (http://jira.jboss.com/jira/browse/JGRP-198);
// this is all the more important once we have a threadless stack (http://jira.jboss.com/jira/browse/JGRP-181),
// where lots of threads can come up to this point concurrently, but only 1 is allowed to pass at a time
// We *can* deliver messages from *different* senders concurrently, e.g. reception of P1, Q1, P2, Q2 can result in
// delivery of P1, Q1, Q2, P2: FIFO (implemented by UNICAST) says messages need to be delivered only in the
// order in which they were sent by their senders
try {
while(true) {
List<Message> msgs=win.removeMany(processing);
if(msgs.isEmpty()) {
released_processing=true;
return;
}
Message highest_removed=msgs.get(msgs.size() -1);
sendAckForMessage(highest_removed); // guaranteed not to throw an exception !
for(Message m: msgs) {
// discard OOB msg: it has already been delivered (http://jira.jboss.com/jira/browse/JGRP-377)
if(m.isFlagSet(Message.OOB))
continue;
num_regular_msgs_removed++;
try {
up_prot.up(new Event(Event.MSG, m));
}
catch(Throwable t) {
log.error("couldn't deliver message " + m, t);